- Published on
使用 sqlite-utils upsert 失败 Troubleshooting
- Authors
- Name
- Wang Zhiwei
借助 sqlite-utils
实现缓存类:
class Cache:
_columns: Dict = dict()
_pk: Tuple = tuple()
_not_null: Set = set()
_defaults: Dict = dict()
_column_order: List = list()
_json_columns: Set = set()
def __init__(self, sqlite_db: str, name: str, recreate: bool = False):
self.name = name
self.db = Database(sqlite_db, recreate=recreate)
self.table = self.db.create_table(
name=name,
columns=self._columns,
pk=self._pk,
not_null=self._not_null,
defaults=self._defaults,
column_order=self._column_order,
if_not_exists=True,
)
...
class TaskManager(Cache):
_columns = {
"task_id": str,
"db_name": str,
"ms_table_name": str,
"rs_table_name": str,
"local_path": str,
"s3_path": str,
"file_size": int,
"need_staging": int,
"bcp_status": int,
"split_status": int,
"compress_status": int,
"s3_upload_status": int,
"redshift_load_status": int,
"redshift_staging_status": int,
"bcp_cost": float,
"split_cost": float,
"compress_cost": float,
"s3_upload_cost": float,
"redshift_load_cost": float,
"redshift_staging_cost": float,
}
_pk: str = "task_id"
_not_null: Set = {"task_id", "db_name", "ms_table_name", "rs_table_name"}
_column_order: List = list(_columns.keys())
_defaults = {
"file_size": 0,
"need_staging": 0,
"bcp_status": 0,
"split_status": 0,
"compress_status": 0,
"s3_upload_status": 0,
"redshift_load_status": 0,
"redshift_staging_status": 0,
"bcp_cost": 0,
"split_cost": 0,
"compress_cost": 0,
"s3_upload_cost": 0,
"redshift_load_cost": 0,
"redshift_staging_cost": 0,
}
def __init__(self, sqlite_db: str = "tasks.sqlite", name: str = "tasks", recreate: bool = False):
super().__init__(sqlite_db, name, recreate)
...
新增一条记录
tm = TaskManager()
record = {
"task_id": "ab0dff35eff14ea43c4a545ff4f29363",
"db_name": "PRESTAGE",
"ms_table_name": "[COACHING].[Dictionary]",
"rs_table_name": "\"enriched_prestage_coaching\".\"dictionary\"",
"local_path": "xxx",
"s3_path": "s3://xxx",
"file_size": 87521,
"need_staging": 0,
"bcp_status": 1,
"split_status": 1,
"compress_status": 1,
"s3_upload_status": 1,
"redshift_load_status": 1,
"redshift_staging_status": 0,
"bcp_cost": 0.05672574043273926,
"split_cost": 0,
"compress_cost": 0,
"s3_upload_cost": 1.8196437358856201,
"redshift_load_cost": 14.933900356292725,
"redshift_staging_cost": 0
}
tm.table.upsert(record, pk="task_id")
上述操作没有任务报错,但是却并未在 tasks
表中看到任何新增数据,很奇怪,那就通过打断点的方式逐步去 DEBUG。
sqlite-utils Table
类的 upsert
方法调用链:
Table.upsert
-> Table.upsert_all
-> Table.insert_all
-> Table.insert_chunk
insert_chunk
函数中进行数据插入的主要部分代码:
queries_and_params = self.build_insert_queries_and_params(
extracts,
chunk,
all_columns,
hash_id,
hash_id_columns,
upsert,
pk,
conversions,
num_records_processed,
replace,
ignore,
)
with self.db.conn:
result = None
for query, params in queries_and_params:
try:
result = self.db.execute(query, params)
...
queries_and_params
的值:
[
[
"INSERT OR IGNORE INTO [tasks]([task_id]) VALUES(?);",
[
"ab0dff35eff14ea43c4a545ff4f29363"
]
],
[
"UPDATE [tasks] SET [bcp_cost] = ?, [bcp_status] = ?, [compress_cost] = ?, [compress_status] = ?, [db_name] = ?, [file_size] = ?, [local_path] = ?, [ms_table_name] = ?, [need_staging] = ?, [redshift_load_cost] = ?, [redshift_load_status] = ?, [redshift_staging_cost] = ?, [redshift_staging_status] = ?, [rs_table_name] = ?, [s3_path] = ?, [s3_upload_cost] = ?, [s3_upload_status] = ?, [split_cost] = ?, [split_status] = ? WHERE [task_id] = ?",
[
0.05672574043273926,
1,
0,
1,
"PRESTAGE",
87521,
"xxx",
"[COACHING].[Dictionary]",
0,
14.933900356292725,
1,
0,
0,
"\"enriched_prestage_coaching\".\"dictionary\"",
"xxx",
1.8196437358856201,
1,
0,
1,
"ab0dff35eff14ea43c4a545ff4f29363"
]
]
]
可以看到,一次 upsert
插入,最后被拆分成了两条 SQL 语句去执行。第一步插入一条只包含主键的记录,然后再更新一条记录的其它字段值。
INSERT OR IGNORE INTO [tasks]([task_id]) VALUES(?);
UPDATE [tasks] SET [bcp_cost] = ?, [bcp_status] = ?, [compress_cost] = ?, [compress_status] = ?, [db_name] = ?, [file_size] = ?, [local_path] = ?, [ms_table_name] = ?, [need_staging] = ?, [redshift_load_cost] = ?, [redshift_load_status] = ?, [redshift_staging_cost] = ?, [redshift_staging_status] = ?, [rs_table_name] = ?, [s3_path] = ?, [s3_upload_cost] = ?, [s3_upload_status] = ?, [split_cost] = ?, [split_status] = ? WHERE [task_id] = ?
问题在于插入使用了 INSERT OR IGNORE INTO
的语法,插入失败的话也会忽略部分异常,不会提示用户。这里的异常是,对于 tasks
表定义了非空字段 "task_id", "db_name", "ms_table_name", "rs_table_name"
,在执行第一步 INSERT 时只给了主键的值,违背了 NOT NULL
约束,所以无法插入成功,又因为用了 OR IGNORE
语法所以直接忽略了异常不反馈给用户。
在 MySQL 中有 INSERT INTO ON DUPLICATE
语法实现 upsert
,是否 SQLite 不支持所以 sqlite-utils 中才这样实现。
Google 搜索关键词 sqlite insert into on duplicate key update
显示的第一条结果为来自 Stack Overflow 的提问,高赞回答提供了相同的 Solution。
INSERT OR IGNORE INTO visits VALUES ($ip, 0);
UPDATE visits SET hits = hits + 1 WHERE ip LIKE $ip;
https://stackoverflow.com/a/2718352/7700479
在另一个后提交的答案中提到, SQLite version 3.24.0 (2018-06-04) 增加了 upsert
语法支持。
https://www.sqlite.org/draft/lang_UPSERT.html
sqlite-utils upsert
实现的代码提交日期晚于 2018-06-04,作者按理不会不知道 SQLite 的新 feature,我在 commit 里搜索了一番,最后找到这个 issue。
作者描述到,sqlite-utils upsert
方法实现的功能非 SQLite upsert
语法声明,方法命名不恰当。早期的 upsert
方法实现基于 INSERT OR REPLACE INTO
语法, 有些问题,后期改成了 INSERT OR IGNORE INTO
+ UPDATE
的实现。
下面是一段 SQLite 官方文档中关于 INSERT OR REPLACE INTO
的描述:
When a UNIQUE or PRIMARY KEY constraint violation occurs, the REPLACE algorithm deletes pre-existing rows that are causing the constraint violation prior to inserting or updating the current row and the command continues executing normally. If a NOT NULL constraint violation occurs, the REPLACE conflict resolution replaces the NULL value with the default value for that column
https://www.sqlite.org/lang_conflict.html